package org.elastic.compreplatform.crawler.core.elasticsearch;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;

import org.apache.commons.lang3.StringUtils;
import org.apache.http.HttpEntity;
import org.apache.http.HttpHost;
import org.apache.http.StatusLine;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.entity.ContentType;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.apache.http.impl.nio.reactor.IOReactorConfig;
import org.apache.http.nio.entity.NStringEntity;
import org.elastic.compreplatform.conf.core.core.ConfZkConf;
import org.elastic.compreplatform.crawler.exception.ESIndexException;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.rest.RestStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.alibaba.fastjson.JSON;

/**
 * ClassName: ESTransportClient 
 * @Description: Elasticsearch操作类
 * 索引操作API：
 * low level https://www.elastic.co/guide/en/elasticsearch/client/java-rest/5.6/java-rest-low.html
 * high level https://www.elastic.co/guide/en/elasticsearch/client/java-rest/5.6/java-rest-high.html
 * @author JornTang
 * @date 2017年12月21日
 */
public class ESRestClient {
	private static Logger log = LoggerFactory.getLogger(ESRestClient.class);
	private static RestClient lowLevelRestClient = null;
	private static RestHighLevelClient highLevelRestClient = null;
	private static int es_client_timeout = Integer.valueOf(ConfZkConf.get("elastic_compre.es.client.timeout"));
	private static String es_url = ConfZkConf.get("elastic_compre.es.url");
	static {
		init();
	}
	/**
	 * @Description: 初始化  
	 * @return void  
	 * @throws
	 * @author JornTang
	 * @date 2017年12月23日
	 */
	public static void init(){
		List<HttpHost> httpHosts = new ArrayList<HttpHost>();
		if(StringUtils.isNotEmpty(es_url)){
			String[] hosts = es_url.split(",");
			for (int i = 0; i < hosts.length; i++) {
				String esUrl = hosts[i];
				String[] host = esUrl.split(":");
				httpHosts.add(new HttpHost(host[0], Integer.valueOf(host[1])));
			}
		}
	    RestClientBuilder builder = RestClient.builder(httpHosts.toArray(new HttpHost[es_url.split(",").length]));
	    builder.setRequestConfigCallback(new RestClientBuilder.RequestConfigCallback() {  
            @Override  
            public RequestConfig.Builder customizeRequestConfig(RequestConfig.Builder requestConfigBuilder) {  
                requestConfigBuilder.setConnectTimeout(es_client_timeout);  
                requestConfigBuilder.setSocketTimeout(es_client_timeout);  
                requestConfigBuilder.setConnectionRequestTimeout(es_client_timeout);  
                return requestConfigBuilder;  
            }  
        }); 
	    		
    	builder.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
            @Override
            public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
                return httpClientBuilder.setDefaultIOReactorConfig(
                        IOReactorConfig.custom()
                        .setIoThreadCount(100)//线程数配置
	                    .setConnectTimeout(es_client_timeout)
	                    .setSoTimeout(es_client_timeout)
	                    .build());
	            }
	        });
  
		//设置超时
		builder.setMaxRetryTimeoutMillis(es_client_timeout);
		//构建low level client
		lowLevelRestClient = builder.build();
		//构建high level client
		highLevelRestClient = new RestHighLevelClient(lowLevelRestClient);
		log.info("ESRestClient 初始化完成。。。。。。。。。");
	}
	/**
	 * @Description: low level put index
	 * @throws IOException   
	 * @return void  
	 * @throws
	 * @author JornTang
	 * @date 2017年12月22日
	 */
	public static void lowLevelPutIndex(String idnex, String type, Map<String, Object> mapper) throws IOException{
		Map<String, String> params = Collections.emptyMap();
		//BeanUtils.copyProperties(mapperVo, mapper);
		HttpEntity entity = new NStringEntity(JSON.toJSONString(mapper), ContentType.APPLICATION_JSON);
		Response response = lowLevelRestClient.performRequest("PUT", "/"+idnex+"/"+type+"/" + mapper.get("documentId"), params, entity);
		StatusLine statusLine = response.getStatusLine();
		log.info("索引执行put:【" + statusLine.getStatusCode() + "】" + statusLine.toString());
	}
	
	/**
	 * @Description: low level delete index
	 * @throws IOException   
	 * @return void  
	 * @throws
	 * @author JornTang
	 * @date 2017年12月22日
	 */
	public static void lowLevelDeleteIndex(String idnex, String type, String documentId) throws IOException{
		Map<String, String> params = Collections.emptyMap();
		Response response = lowLevelRestClient.performRequest("DELETE", "/"+idnex+"/"+type+"/" + documentId, params); 
		StatusLine statusLine = response.getStatusLine();
		log.info("索引执行delete:【" + statusLine.getStatusCode() + "】" + statusLine.toString());
	}
	
	/**
	 * @Description: high level put index  ps:need jdk1.8
	 * @param idnex 索引
	 * @param type 类型
	 * @param documentId 文档ID
	 * @param mapper 索引数据
	 * @throws IOException   
	 * @return void  
	 * @throws
	 * @author JornTang
	 * @date 2017年12月22日
	 */
	public static void highLevelPutIndex(String idnex, String type, Map<String, Object> mapper) throws IOException{
		IndexRequest request = new IndexRequest(idnex, type,  mapper.get("documentId")+"");  
		request.source(JSON.toJSONString(mapper), XContentType.JSON);
		request.opType(DocWriteRequest.OpType.CREATE);
		//同步执行
		IndexResponse indexResponse = highLevelRestClient.index(request);
		RestStatus stat = doSuccessful(indexResponse);
		log.info("索引执行put:【" + stat.getStatus() + "】" + stat.toString());
		//获取响应信息
//		String index = indexResponse.getIndex();
//		String type = indexResponse.getType();
//		String id = indexResponse.getId();
//		long version = indexResponse.getVersion();
//		if(indexResponse.getResult()== DocWriteResponse.Result.CREATED){
//		    
//		} else if(indexResponse.getResult()== DocWriteResponse.Result.UPDATED){
//		    
//		}
//		ReplicationResponse.ShardInfo shardInfo = indexResponse.getShardInfo();
//		if(shardInfo.getTotal() != shardInfo.getSuccessful()){
//		    
//		}
//		if(shardInfo.getFailed()> 0){
//		    for(ReplicationResponse.ShardInfo.Failure failure : shardInfo.getFailures()){
//		        String reason = failure.reason(); 
//		    }
//		}
	}
	
	/**
	 * @Description: high level delete index  ps:need jdk1.8
	 * @param idnex 索引
	 * @param type 类型
	 * @param documentId 文档id
	 * @throws IOException   
	 * @return void  
	 * @throws
	 * @author JornTang
	 * @date 2017年12月22日
	 */
	public static void highLevelDeleteIndex(String idnex, String type, String documentId) throws IOException{
		DeleteRequest deleteRequest = new DeleteRequest(idnex, type, documentId);  
		//同步执行
		DeleteResponse  deleteResponse = highLevelRestClient.delete(deleteRequest);
		RestStatus stat = doSuccessful(deleteResponse);
		log.info("索引执行delete:【" + stat.getStatus() + "】" + stat.toString());
	}
	
	/**
	 * @Description: high level bulk put index  ps:need jdk1.8
	 * @param idnex 索引
	 * @param type 类型
	 * @param mappers 索引数据集合
	 * @throws Exception   
	 * @return void  
	 * @throws
	 * @author JornTang
	 * @date 2017年12月22日
	 */
	public static void bulkPutIndex(String idnex, String type, List<Map<String, Object>> mappers) throws Exception{
		BulkRequest request = new BulkRequest();
		if(mappers== null || mappers.size()< 1){
			throw new ESIndexException("mappers can not be empty");
		}
		for (int i = 0; i < mappers.size(); i++) {
			Map<String, Object> mapper = mappers.get(i);
			request.add(new IndexRequest(idnex, type, mapper.get("DocumentId")+"")
	        .source(JSON.toJSONString(mapper),XContentType.JSON));
		}
		highLevelRestClient.bulk(request);
		//BulkResponse bulkResponse = 
		//RestStatus stat = doSuccessful(bulkResponse);
		//log.info("索引执行bulk put:【" + stat.getStatus() + "】" + stat.toString());
	}
	
	/**
	 * @Description: high level bulk put index  ps:need jdk1.8
	 * @param idnex 索引
	 * @param type 类型
	 * @param mappers 索引数据集合
	 * @throws Exception   
	 * @return void  
	 * @throws
	 * @author JornTang
	 * @date 2017年12月22日
	 */
	public static void bulkMapIndex(String idnex, String type, List<Map<String,Object>> mappers) throws Exception{
		BulkRequest request = new BulkRequest();
		if(mappers== null || mappers.size()< 1){
			throw new ESIndexException("mappers can not be empty");
		}
		for (int i = 0; i < mappers.size(); i++) {
			Map<String,Object> mapper = mappers.get(i);
			request.add(new IndexRequest(idnex, type)
			//.opType("create")
	        .source(JSON.toJSONString(mapper),XContentType.JSON));
		}
		highLevelRestClient.bulk(request);
	}
	
	/**
	 * @Description: 请求状态构造
	 * @return   
	 * @return boolean  
	 * @throws
	 * @author JornTang
	 * @date 2017年12月22日
	 */
	public static RestStatus doSuccessful(Object response){
		RestStatus status = null;
		if(response instanceof DeleteResponse){
			DeleteResponse deleteResponse = (DeleteResponse)response;
			status = deleteResponse.status();
		}else if(response instanceof IndexResponse){
			IndexResponse indexResponse = (IndexResponse)response;
			status = indexResponse.status();
		}else if(response instanceof UpdateResponse){
			UpdateResponse updateResponse = (UpdateResponse)response;
			status = updateResponse.status();
		}else if(response instanceof BulkResponse){
			BulkResponse bulkResponse = (BulkResponse)response;
			status = bulkResponse.status();
		}
		return status;
	}
	
	public static RestClient getLowLevelRestClient() {
		return lowLevelRestClient;
	}
	public static RestHighLevelClient getHighLevelRestClient() {
		return highLevelRestClient;
	}
}
